package com.atguigu.flink.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/13
 *
 类型：  时间，计数
 特征：  滚动，滑动，会话
 计算并行度： 全局，keyed

 演示： 全局时间窗口
            滚动
            滑动
            会话

    TimeWindow:
            size:  是一个时间范围 [start,end)
            slide:  到end这个时间点就触发运算。

    在flink中有两种时间:
        processingTime(演示): 时间的推进，不受人为干扰，参考现实世界的物理时钟。
        eventTime:  时间从数据中提取，数据说现在几点，就是几点。

 */
public class Demo3_TimeWindowAll
{
    public static void main(String[] args) {
        
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.setParallelism(2);

         env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .keyBy(WaterSensor::getId)

            .windowAll(
                /*
                    滚动:  时间范围的起始时间从时间戳0开始
                            size = 5秒(窗口中存5秒的数据)， slide = 5s(每5s触发一次窗口运行)
                            第一个窗口:  [0,5000)
                            第二个窗口:  [5000,10000)
                            .....
                         有数据，才会触发窗口的构造。

                    滑动： 时间范围的起始时间不是从0开始。
                            从 slide - size 开始。

                    会话:  只需要设置gap。
                                两条元素到达的时间间隔等于gap，上一个窗口就关闭。
                                最后一条元素到达后，超过gap时间，没有新的元素到达，窗口就关闭
                 */
               // TumblingProcessingTimeWindows.of(Time.seconds(5l))
                //SlidingProcessingTimeWindows.of(Time.seconds(5l),Time.seconds(3l))
                ProcessingTimeSessionWindows.withGap(Time.seconds(5l))
            )
            .process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>()
            {
                @Override
                public void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                    //获取当前窗口的时间范围
                    TimeWindow window = context.window();
                    out.collect(MyUtil.parseTimeWindow(window)+":"+MyUtil.parseToList(elements));


                }
            })
            .print();

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
